DEBUG = 0
-class OutOfOrderError(RuntimeError):
- """Error reported when a response message arrives out of order.
- """
- pass
-
class Responder:
"""Handler for a response to a message with a specified id.
"""
self.dom = None
self.channel = None
self.idx = None
- self.responders = []
+ self.responders = {}
self.timeout = 10
def setTimeout(self, timeout):
def addResponder(self, mid, deferred):
"""Add a responder for a message id.
The I{deferred} is called with callback(msg) when a response
- with message id I{mid} arrives. Responses are expected
- to arrive in order of message id. When a response arrives,
- waiting responders for messages with lower id have errback
- called with an OutOfOrder error.
+ with message id I{mid} arrives.
Responders have a timeout set and I{deferred} will error
on expiry.
@return: responder
@rtype: Responder
"""
+ resp = Responder(mid, deferred)
+ self.responders[resp.mid] = resp
if self.timeout > 0:
deferred.setTimeout(self.timeout)
- resp = Responder(mid, deferred)
- self.responders.append(resp)
return resp
def callResponders(self, msg):
hdr = msg.get_header()
mid = hdr['id']
handled = 0
- while self.responders:
- resp = self.responders[0]
- if resp.mid > mid:
- break
- self.responders.pop()
- if resp.mid < mid:
- print 'callResponders> Out of order:', resp.mid, mid
- resp.error(OutOfOrderError())
- else:
- handled = 1
- resp.responseReceived(msg)
- break
+ resp = self.responders.get(mid)
+ if resp:
+ handled = 1
+ resp.responseReceived(msg)
+ del self.responders[mid]
+ # Clean up called responders.
+ for resp in self.responders.values():
+ if resp.deferred.called:
+ del self.responders[resp.mid]
return handled
def lostChannel(self):